001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.scheduler;
018
019import java.io.File;
020import java.util.concurrent.atomic.AtomicBoolean;
021
022import org.apache.activemq.ScheduledMessage;
023import org.apache.activemq.advisory.AdvisorySupport;
024import org.apache.activemq.broker.Broker;
025import org.apache.activemq.broker.BrokerFilter;
026import org.apache.activemq.broker.ConnectionContext;
027import org.apache.activemq.broker.ProducerBrokerExchange;
028import org.apache.activemq.command.ActiveMQDestination;
029import org.apache.activemq.command.Message;
030import org.apache.activemq.command.MessageId;
031import org.apache.activemq.command.ProducerId;
032import org.apache.activemq.command.ProducerInfo;
033import org.apache.activemq.openwire.OpenWireFormat;
034import org.apache.activemq.security.SecurityContext;
035import org.apache.activemq.state.ProducerState;
036import org.apache.activemq.util.IdGenerator;
037import org.apache.activemq.util.LongSequenceGenerator;
038import org.apache.activemq.util.TypeConversionSupport;
039import org.apache.activemq.wireformat.WireFormat;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042import org.apache.kahadb.util.ByteSequence;
043
044public class SchedulerBroker extends BrokerFilter implements JobListener {
045    private static final Logger LOG = LoggerFactory.getLogger(SchedulerBroker.class);
046    private static final IdGenerator ID_GENERATOR = new IdGenerator();
047    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
048    private final AtomicBoolean started = new AtomicBoolean();
049    private final WireFormat wireFormat = new OpenWireFormat();
050    private final ConnectionContext context = new ConnectionContext();
051    private final ProducerId producerId = new ProducerId();
052    private File directory;
053
054    private JobSchedulerStore store;
055    private JobScheduler scheduler;
056
057    public SchedulerBroker(Broker next, File directory) throws Exception {
058        super(next);
059        this.directory = directory;
060        this.producerId.setConnectionId(ID_GENERATOR.generateId());
061        this.context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
062        context.setBroker(next);
063        LOG.info("Scheduler using directory: " + directory);
064
065    }
066
067    public synchronized JobScheduler getJobScheduler() throws Exception {
068        return new JobSchedulerFacade(this);
069    }
070
071    /**
072     * @return the directory
073     */
074    public File getDirectory() {
075        return this.directory;
076    }
077    /**
078     * @param directory
079     *            the directory to set
080     */
081    public void setDirectory(File directory) {
082        this.directory = directory;
083    }
084
085    @Override
086    public void start() throws Exception {
087        this.started.set(true);
088        getInternalScheduler();
089        super.start();
090    }
091
092    @Override
093    public void stop() throws Exception {
094        if (this.started.compareAndSet(true, false)) {
095
096            if (this.store != null) {
097                this.store.stop();
098            }
099            if (this.scheduler != null) {
100                this.scheduler.removeListener(this);
101                this.scheduler = null;
102            }
103        }
104        super.stop();
105    }
106
107    @Override
108    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
109        long delay = 0;
110        long period = 0;
111        int repeat = 0;
112        String cronEntry = "";
113        String jobId = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_ID);
114        Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
115        Object periodValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
116        Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
117
118        String physicalName = messageSend.getDestination().getPhysicalName();
119        boolean schedularManage = physicalName.regionMatches(true, 0,
120                ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0,
121                ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length());
122
123        if (schedularManage == true) {
124
125            JobScheduler scheduler = getInternalScheduler();
126            ActiveMQDestination replyTo = messageSend.getReplyTo();
127
128            String action = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION);
129
130            if (action != null ) {
131
132                Object startTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME);
133                Object endTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME);
134
135                if (replyTo != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE)) {
136
137                    if( startTime != null && endTime != null ) {
138
139                        long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
140                        long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
141
142                        for (Job job : scheduler.getAllJobs(start, finish)) {
143                            sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
144                        }
145                    } else {
146                        for (Job job : scheduler.getAllJobs()) {
147                            sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
148                        }
149                    }
150                }
151                if (jobId != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE)) {
152                    scheduler.remove(jobId);
153                } else if (action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL)) {
154
155                    if( startTime != null && endTime != null ) {
156
157                        long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
158                        long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
159
160                        scheduler.removeAllJobs(start, finish);
161                    } else {
162                        scheduler.removeAllJobs();
163                    }
164                }
165            }
166
167        } else if ((cronValue != null || periodValue != null || delayValue != null) && jobId == null) {
168            //clear transaction context
169            Message msg = messageSend.copy();
170            msg.setTransactionId(null);
171            org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(msg);
172            if (cronValue != null) {
173                cronEntry = cronValue.toString();
174            }
175            if (periodValue != null) {
176              period = (Long) TypeConversionSupport.convert(periodValue, Long.class);
177            }
178            if (delayValue != null) {
179                delay = (Long) TypeConversionSupport.convert(delayValue, Long.class);
180            }
181            Object repeatValue = msg.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
182            if (repeatValue != null) {
183                repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
184            }
185            getInternalScheduler().schedule(msg.getMessageId().toString(),
186                    new ByteSequence(packet.data, packet.offset, packet.length),cronEntry, delay, period, repeat);
187
188        } else {
189            super.send(producerExchange, messageSend);
190        }
191    }
192
193    public void scheduledJob(String id, ByteSequence job) {
194        org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getData(), job
195                .getOffset(), job.getLength());
196        try {
197            Message messageSend = (Message) this.wireFormat.unmarshal(packet);
198            messageSend.setOriginalTransactionId(null);
199            Object repeatValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
200            Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
201            String cronStr = cronValue != null ? cronValue.toString() : null;
202            int repeat = 0;
203            if (repeatValue != null) {
204                repeat = (Integer) TypeConversionSupport.convert(repeatValue, Integer.class);
205            }
206
207            if (repeat != 0 || cronStr != null && cronStr.length() > 0) {
208                // create a unique id - the original message could be sent
209                // lots of times
210                messageSend.setMessageId(
211                        new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
212            }
213
214            // Add the jobId as a property
215            messageSend.setProperty("scheduledJobId", id);
216
217            // if this goes across a network - we don't want it rescheduled
218            messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
219            messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
220            messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
221            messageSend.removeProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
222
223            if (messageSend.getTimestamp() > 0 && messageSend.getExpiration() > 0) {
224
225                long oldExpiration = messageSend.getExpiration();
226                long newTimeStamp = System.currentTimeMillis();
227                long timeToLive = 0;
228                long oldTimestamp = messageSend.getTimestamp();
229
230                if (oldExpiration > 0) {
231                    timeToLive = oldExpiration - oldTimestamp;
232                }
233
234                long expiration = timeToLive + newTimeStamp;
235
236                if(expiration > oldExpiration) {
237                    if (timeToLive > 0 && expiration > 0) {
238                        messageSend.setExpiration(expiration);
239                    }
240                    messageSend.setTimestamp(newTimeStamp);
241                    if (LOG.isDebugEnabled()) {
242                        LOG.debug("Set message " + messageSend.getMessageId() + " timestamp from " + oldTimestamp + " to " + newTimeStamp);
243                    }
244                }
245            }
246
247            final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
248            producerExchange.setConnectionContext(context);
249            producerExchange.setMutable(true);
250            producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
251            super.send(producerExchange, messageSend);
252        } catch (Exception e) {
253            LOG.error("Failed to send scheduled message " + id, e);
254        }
255    }
256
257    protected synchronized JobScheduler getInternalScheduler() throws Exception {
258        if (this.started.get()) {
259            if (this.scheduler == null) {
260                this.scheduler = getStore().getJobScheduler("JMS");
261                this.scheduler.addListener(this);
262            }
263            return this.scheduler;
264        }
265        return null;
266    }
267
268    private JobSchedulerStore getStore() throws Exception {
269        if (started.get()) {
270            if (this.store == null) {
271                this.store = new JobSchedulerStore();
272                this.store.setDirectory(directory);
273                this.store.start();
274            }
275            return this.store;
276        }
277        return null;
278    }
279
280    protected void sendScheduledJob(ConnectionContext context, Job job, ActiveMQDestination replyTo)
281            throws Exception {
282
283        org.apache.activemq.util.ByteSequence packet = new org.apache.activemq.util.ByteSequence(job.getPayload());
284        try {
285            Message msg = (Message) this.wireFormat.unmarshal(packet);
286            msg.setOriginalTransactionId(null);
287            msg.setPersistent(false);
288            msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
289            msg.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
290            msg.setDestination(replyTo);
291            msg.setResponseRequired(false);
292            msg.setProducerId(this.producerId);
293
294            // Add the jobId as a property
295            msg.setProperty("scheduledJobId", job.getJobId());
296
297            final boolean originalFlowControl = context.isProducerFlowControl();
298            final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
299            producerExchange.setConnectionContext(context);
300            producerExchange.setMutable(true);
301            producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
302            try {
303                context.setProducerFlowControl(false);
304                this.next.send(producerExchange, msg);
305            } finally {
306                context.setProducerFlowControl(originalFlowControl);
307            }
308        } catch (Exception e) {
309            LOG.error("Failed to send scheduled message " + job.getJobId(), e);
310        }
311
312    }
313}