1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import org.jboss.netty.util.ExternalResourceReleasable;
19 import org.jboss.netty.util.internal.ExecutorUtil;
20
21 import java.util.concurrent.Executor;
22 import java.util.concurrent.atomic.AtomicInteger;
23
24 public abstract class AbstractNioBossPool<E extends Boss>
25 implements BossPool<E>, ExternalResourceReleasable {
26
27 private final Boss[] bosses;
28 private final AtomicInteger bossIndex = new AtomicInteger();
29 private final Executor bossExecutor;
30 private volatile boolean initDone;
31
32
33
34
35
36
37
38 AbstractNioBossPool(Executor bossExecutor, int bossCount) {
39 this(bossExecutor, bossCount, true);
40 }
41
42 AbstractNioBossPool(Executor bossExecutor, int bossCount, boolean autoInit) {
43 if (bossExecutor == null) {
44 throw new NullPointerException("bossExecutor");
45 }
46 if (bossCount <= 0) {
47 throw new IllegalArgumentException(
48 "bossCount (" + bossCount + ") " +
49 "must be a positive integer.");
50 }
51 bosses = new Boss[bossCount];
52 this.bossExecutor = bossExecutor;
53 if (autoInit) {
54 init();
55 }
56 }
57
58 protected void init() {
59 if (initDone) {
60 throw new IllegalStateException("Init was done before");
61 }
62 initDone = true;
63
64 for (int i = 0; i < bosses.length; i++) {
65 bosses[i] = newBoss(bossExecutor);
66 }
67 }
68
69
70
71
72
73
74
75
76 protected abstract E newBoss(Executor executor);
77
78 @SuppressWarnings("unchecked")
79 public E nextBoss() {
80 return (E) bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];
81 }
82
83 public void rebuildSelectors() {
84 for (Boss boss: bosses) {
85 boss.rebuildSelector();
86 }
87 }
88
89 public void releaseExternalResources() {
90 shutdown();
91 ExecutorUtil.shutdownNow(bossExecutor);
92 }
93
94 public void shutdown() {
95 for (Boss boss: bosses) {
96 boss.shutdown();
97 }
98 }
99 }