View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package org.jboss.netty.channel.socket.nio;
18  
19  import org.jboss.netty.channel.socket.Worker;
20  import org.jboss.netty.logging.InternalLogger;
21  import org.jboss.netty.logging.InternalLoggerFactory;
22  import org.jboss.netty.util.ExternalResourceReleasable;
23  import org.jboss.netty.util.internal.ExecutorUtil;
24  
25  import java.util.concurrent.Executor;
26  import java.util.concurrent.TimeUnit;
27  import java.util.concurrent.atomic.AtomicBoolean;
28  import java.util.concurrent.atomic.AtomicInteger;
29  
30  /**
31   * Abstract base class for {@link WorkerPool} implementations that create the {@link Worker}'s
32   * up-front and return them in a "fair" fashion when calling {@link #nextWorker()}
33   */
34  public abstract class AbstractNioWorkerPool<E extends AbstractNioWorker>
35          implements WorkerPool<E>, ExternalResourceReleasable {
36  
37      /**
38       * The worker pool raises an exception unless all worker threads start and run within this timeout (in seconds.)
39       */
40      private static final int INITIALIZATION_TIMEOUT = 10;
41  
42      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractNioWorkerPool.class);
43  
44      private final AbstractNioWorker[] workers;
45      private final AtomicInteger workerIndex = new AtomicInteger();
46      private final Executor workerExecutor;
47      private final AtomicBoolean initialized = new AtomicBoolean(false);
48  
49      /**
50       * Create a new instance
51       *
52       * @param workerExecutor the {@link Executor} to use for the {@link Worker}'s
53       * @param workerCount the count of {@link Worker}'s to create
54       */
55      AbstractNioWorkerPool(Executor workerExecutor, int workerCount) {
56          this(workerExecutor, workerCount, true);
57      }
58  
59      AbstractNioWorkerPool(Executor workerExecutor, int workerCount, boolean autoInit) {
60          if (workerExecutor == null) {
61              throw new NullPointerException("workerExecutor");
62          }
63          if (workerCount <= 0) {
64              throw new IllegalArgumentException(
65                      "workerCount (" + workerCount + ") " + "must be a positive integer.");
66          }
67          workers = new AbstractNioWorker[workerCount];
68          this.workerExecutor = workerExecutor;
69          if (autoInit) {
70              init();
71          }
72      }
73  
74      protected void init() {
75          if (!initialized.compareAndSet(false, true)) {
76              throw new IllegalStateException("initialized already");
77          }
78  
79          for (int i = 0; i < workers.length; i++) {
80              workers[i] = newWorker(workerExecutor);
81          }
82  
83          waitForWorkerThreads();
84      }
85  
86      private void waitForWorkerThreads() {
87          long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(INITIALIZATION_TIMEOUT);
88          boolean warn = false;
89          for (AbstractNioSelector worker: workers) {
90              long waitTime = deadline - System.nanoTime();
91              try {
92                  if (waitTime <= 0) {
93                      if (worker.thread == null) {
94                          warn = true;
95                          break;
96                      }
97                  } else if (!worker.startupLatch.await(waitTime, TimeUnit.NANOSECONDS)) {
98                      warn = true;
99                      break;
100                 }
101             } catch (InterruptedException ignore) {
102                 // Stop waiting for the worker threads and let someone else take care of the interruption.
103                 Thread.currentThread().interrupt();
104                 break;
105             }
106         }
107 
108         if (warn) {
109             logger.warn(
110                     "Failed to get all worker threads ready within " + INITIALIZATION_TIMEOUT + " second(s). " +
111                     "Make sure to specify the executor which has more threads than the requested workerCount. " +
112                     "If unsure, use Executors.newCachedThreadPool().");
113         }
114     }
115 
116     /**
117      * Create a new {@link Worker} which uses the given {@link Executor} to service IO.
118      *
119      * @param executor the {@link Executor} to use
120      * @return worker the new {@link Worker}
121      */
122     protected abstract E newWorker(Executor executor);
123 
124     @SuppressWarnings("unchecked")
125     public E nextWorker() {
126         return (E) workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];
127     }
128 
129     public void rebuildSelectors() {
130         for (AbstractNioWorker worker: workers) {
131             worker.rebuildSelector();
132         }
133     }
134 
135     public void releaseExternalResources() {
136         shutdown();
137         ExecutorUtil.shutdownNow(workerExecutor);
138     }
139 
140     public void shutdown() {
141         for (AbstractNioWorker worker: workers) {
142             worker.shutdown();
143         }
144     }
145 
146 }