1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import org.jboss.netty.logging.InternalLogger;
19 import org.jboss.netty.logging.InternalLoggerFactory;
20 import org.jboss.netty.util.ExternalResourceReleasable;
21 import org.jboss.netty.util.internal.ExecutorUtil;
22
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import java.util.concurrent.atomic.AtomicInteger;
27
28 public abstract class AbstractNioBossPool<E extends Boss>
29 implements BossPool<E>, ExternalResourceReleasable {
30
31
32
33
34 private static final int INITIALIZATION_TIMEOUT = 10;
35
36 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractNioBossPool.class);
37
38 private final Boss[] bosses;
39 private final AtomicInteger bossIndex = new AtomicInteger();
40 private final Executor bossExecutor;
41 private final AtomicBoolean initialized = new AtomicBoolean(false);
42
43
44
45
46
47
48
49 AbstractNioBossPool(Executor bossExecutor, int bossCount) {
50 this(bossExecutor, bossCount, true);
51 }
52
53 AbstractNioBossPool(Executor bossExecutor, int bossCount, boolean autoInit) {
54 if (bossExecutor == null) {
55 throw new NullPointerException("bossExecutor");
56 }
57 if (bossCount <= 0) {
58 throw new IllegalArgumentException(
59 "bossCount (" + bossCount + ") " +
60 "must be a positive integer.");
61 }
62 bosses = new Boss[bossCount];
63 this.bossExecutor = bossExecutor;
64 if (autoInit) {
65 init();
66 }
67 }
68
69 protected void init() {
70 if (!initialized.compareAndSet(false, true)) {
71 throw new IllegalStateException("initialized already");
72 }
73
74 for (int i = 0; i < bosses.length; i++) {
75 bosses[i] = newBoss(bossExecutor);
76 }
77
78 waitForBossThreads();
79 }
80
81 private void waitForBossThreads() {
82 long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(INITIALIZATION_TIMEOUT);
83 boolean warn = false;
84 for (Boss boss: bosses) {
85 if (!(boss instanceof AbstractNioSelector)) {
86 continue;
87 }
88
89 AbstractNioSelector selector = (AbstractNioSelector) boss;
90 long waitTime = deadline - System.nanoTime();
91 try {
92 if (waitTime <= 0) {
93 if (selector.thread == null) {
94 warn = true;
95 break;
96 }
97 } else if (!selector.startupLatch.await(waitTime, TimeUnit.NANOSECONDS)) {
98 warn = true;
99 break;
100 }
101 } catch (InterruptedException ignore) {
102
103 Thread.currentThread().interrupt();
104 break;
105 }
106 }
107
108 if (warn) {
109 logger.warn(
110 "Failed to get all boss threads ready within " + INITIALIZATION_TIMEOUT + " second(s). " +
111 "Make sure to specify the executor which has more threads than the requested bossCount. " +
112 "If unsure, use Executors.newCachedThreadPool().");
113 }
114 }
115
116
117
118
119
120
121
122
123 protected abstract E newBoss(Executor executor);
124
125 @SuppressWarnings("unchecked")
126 public E nextBoss() {
127 return (E) bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];
128 }
129
130 public void rebuildSelectors() {
131 for (Boss boss: bosses) {
132 boss.rebuildSelector();
133 }
134 }
135
136 public void releaseExternalResources() {
137 shutdown();
138 ExecutorUtil.shutdownNow(bossExecutor);
139 }
140
141 public void shutdown() {
142 for (Boss boss: bosses) {
143 boss.shutdown();
144 }
145 }
146 }