001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.nio;
018
019import java.io.IOException;
020import java.nio.channels.SocketChannel;
021import java.util.LinkedList;
022import java.util.concurrent.Executor;
023import java.util.concurrent.ExecutorService;
024import java.util.concurrent.SynchronousQueue;
025import java.util.concurrent.ThreadFactory;
026import java.util.concurrent.ThreadPoolExecutor;
027import java.util.concurrent.TimeUnit;
028
029/**
030 * The SelectorManager will manage one Selector and the thread that checks the
031 * selector.
032 *
033 * We may need to consider running more than one thread to check the selector if
034 * servicing the selector takes too long.
035 */
036public final class SelectorManager {
037
038    public static final SelectorManager SINGLETON = new SelectorManager();
039
040    private Executor selectorExecutor = createDefaultExecutor();
041    private Executor channelExecutor = selectorExecutor;
042    private LinkedList<SelectorWorker> freeWorkers = new LinkedList<SelectorWorker>();
043    private int maxChannelsPerWorker = 1024;
044
045    protected ExecutorService createDefaultExecutor() {
046        ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
047
048            private long i = 0;
049
050            public Thread newThread(Runnable runnable) {
051                this.i++;
052                final Thread t = new Thread(runnable, "ActiveMQ NIO Worker " + this.i);
053                return t;
054            }
055        });
056
057        return rc;
058    }
059
060    public static SelectorManager getInstance() {
061        return SINGLETON;
062    }
063
064    public interface Listener {
065        void onSelect(SelectorSelection selector);
066        void onError(SelectorSelection selection, Throwable error);
067    }
068
069    public synchronized SelectorSelection register(SocketChannel socketChannel, Listener listener)
070        throws IOException {
071
072        SelectorSelection selection = null;
073        while( selection == null ) {
074            if (freeWorkers.size() > 0) {
075                SelectorWorker worker = freeWorkers.getFirst();
076                if( worker.isReleased() ) {
077                    freeWorkers.remove(worker);
078                } else {
079                    worker.retain();
080                    selection = new SelectorSelection(worker, socketChannel, listener);
081                }
082            } else {
083                // Worker starts /w retain count of 1
084                SelectorWorker worker = new SelectorWorker(this);
085                freeWorkers.addFirst(worker);
086                selection = new SelectorSelection(worker, socketChannel, listener);
087            }
088        }
089
090        return selection;
091    }
092
093    synchronized void onWorkerFullEvent(SelectorWorker worker) {
094        freeWorkers.remove(worker);
095    }
096
097    public synchronized void onWorkerEmptyEvent(SelectorWorker worker) {
098        freeWorkers.remove(worker);
099    }
100
101    public synchronized void onWorkerNotFullEvent(SelectorWorker worker) {
102        freeWorkers.addFirst(worker);
103    }
104
105    public Executor getChannelExecutor() {
106        return channelExecutor;
107    }
108
109    public void setChannelExecutor(Executor channelExecutor) {
110        this.channelExecutor = channelExecutor;
111    }
112
113    public int getMaxChannelsPerWorker() {
114        return maxChannelsPerWorker;
115    }
116
117    public void setMaxChannelsPerWorker(int maxChannelsPerWorker) {
118        this.maxChannelsPerWorker = maxChannelsPerWorker;
119    }
120
121    public Executor getSelectorExecutor() {
122        return selectorExecutor;
123    }
124
125    public void setSelectorExecutor(Executor selectorExecutor) {
126        this.selectorExecutor = selectorExecutor;
127    }
128}