1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.jboss.netty.channel.socket.nio;
18
19 import org.jboss.netty.channel.socket.Worker;
20 import org.jboss.netty.logging.InternalLogger;
21 import org.jboss.netty.logging.InternalLoggerFactory;
22 import org.jboss.netty.util.ExternalResourceReleasable;
23 import org.jboss.netty.util.internal.ExecutorUtil;
24
25 import java.util.concurrent.Executor;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import java.util.concurrent.atomic.AtomicInteger;
29
30
31
32
33
34 public abstract class AbstractNioWorkerPool<E extends AbstractNioWorker>
35 implements WorkerPool<E>, ExternalResourceReleasable {
36
37
38
39
40 private static final int INITIALIZATION_TIMEOUT = 10;
41
42 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractNioWorkerPool.class);
43
44 private final AbstractNioWorker[] workers;
45 private final AtomicInteger workerIndex = new AtomicInteger();
46 private final Executor workerExecutor;
47 private final AtomicBoolean initialized = new AtomicBoolean(false);
48
49
50
51
52
53
54
55 AbstractNioWorkerPool(Executor workerExecutor, int workerCount) {
56 this(workerExecutor, workerCount, true);
57 }
58
59 AbstractNioWorkerPool(Executor workerExecutor, int workerCount, boolean autoInit) {
60 if (workerExecutor == null) {
61 throw new NullPointerException("workerExecutor");
62 }
63 if (workerCount <= 0) {
64 throw new IllegalArgumentException(
65 "workerCount (" + workerCount + ") " + "must be a positive integer.");
66 }
67 workers = new AbstractNioWorker[workerCount];
68 this.workerExecutor = workerExecutor;
69 if (autoInit) {
70 init();
71 }
72 }
73
74 protected void init() {
75 if (!initialized.compareAndSet(false, true)) {
76 throw new IllegalStateException("initialized already");
77 }
78
79 for (int i = 0; i < workers.length; i++) {
80 workers[i] = newWorker(workerExecutor);
81 }
82
83 waitForWorkerThreads();
84 }
85
86 private void waitForWorkerThreads() {
87 long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(INITIALIZATION_TIMEOUT);
88 boolean warn = false;
89 for (AbstractNioSelector worker: workers) {
90 long waitTime = deadline - System.nanoTime();
91 try {
92 if (waitTime <= 0) {
93 if (worker.thread == null) {
94 warn = true;
95 break;
96 }
97 } else if (!worker.startupLatch.await(waitTime, TimeUnit.NANOSECONDS)) {
98 warn = true;
99 break;
100 }
101 } catch (InterruptedException ignore) {
102
103 Thread.currentThread().interrupt();
104 break;
105 }
106 }
107
108 if (warn) {
109 logger.warn(
110 "Failed to get all worker threads ready within " + INITIALIZATION_TIMEOUT + " second(s). " +
111 "Make sure to specify the executor which has more threads than the requested workerCount. " +
112 "If unsure, use Executors.newCachedThreadPool().");
113 }
114 }
115
116
117
118
119
120
121
122 protected abstract E newWorker(Executor executor);
123
124 @SuppressWarnings("unchecked")
125 public E nextWorker() {
126 return (E) workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];
127 }
128
129 public void rebuildSelectors() {
130 for (AbstractNioWorker worker: workers) {
131 worker.rebuildSelector();
132 }
133 }
134
135 public void releaseExternalResources() {
136 shutdown();
137 ExecutorUtil.shutdownNow(workerExecutor);
138 }
139
140 public void shutdown() {
141 for (AbstractNioWorker worker: workers) {
142 worker.shutdown();
143 }
144 }
145
146 }